package drds.data_propagate.parse;

import drds.data_propagate.binlog_event.secondary.position.EntryPosition;
import drds.data_propagate.binlog_event_filter.BinlogEventFilter;
import drds.data_propagate.binlog_event_filter.aviater.AviaterRegexBinlogEventFilter;
import drds.data_propagate.parse.exception.ParseException;
import drds.data_propagate.parse.multistage_coordinator.MultistageCoordinator;
import drds.data_propagate.parse.multistage_coordinator.MultistageCoordinatorImpl;
import drds.data_propagate.parse.table_meta_data.DatabaseTableMetaDataStore;
import drds.data_propagate.parse.table_meta_data.TableMetaDataFactory;
import drds.data_propagate.parse.table_meta_data.TableMetaDataFactoryImpl;
import drds.data_propagate.parse.table_meta_data.TableMetaDataStore;
import lombok.Getter;
import lombok.Setter;

import java.nio.charset.Charset;
import java.util.concurrent.atomic.AtomicLong;

public abstract class AbstractMysqlEventParser extends AbstractEventParser {

    protected static final long BINLOG_START_OFFEST = 4L;
    // task received protogenesis bytes
    @Setter
    @Getter
    protected final AtomicLong receivedBinlogBytes = new AtomicLong(0L);
    @Setter
    @Getter
    private final AtomicLong eventsPublishBlockingTime = new AtomicLong(0L);
    @Setter
    @Getter
    protected TableMetaDataFactory tableMetaDataFactory = new TableMetaDataFactoryImpl();
    @Setter
    @Getter
    protected boolean enableTableMetaDataCache = false;
    @Setter
    @Getter
    protected int tsdbSnapshotInterval = 24;
    @Setter
    @Getter
    protected int tsdbSnapshotExpire = 360;
    @Setter
    @Getter
    protected String tsdbSpringXml;
    @Setter
    @Getter
    protected TableMetaDataStore tableMetaDataStore;
    // 编码信息
    @Setter
    @Getter
    protected byte connectionCharsetNumber = (byte) 33;
    @Setter
    @Getter
    protected Charset connectionCharset = Charset.forName("UTF-8");
    @Setter
    @Getter
    protected boolean filterQueryDcl = false;
    @Setter
    @Getter
    protected boolean filterQueryDml = false;
    @Setter
    @Getter
    protected boolean filterQueryDdl = false;
    @Setter
    @Getter
    protected boolean filterRows = false;
    @Setter
    @Getter
    protected boolean filterTableError = false;
    @Setter
    @Getter
    protected boolean useDruidDdlFilter = true;

    protected BinlogParser buildBinlogParser() {
        BinlogEventConvertToEntry binlogEventConvertToEntry = new BinlogEventConvertToEntry();
        if (binlogEventFilter != null && binlogEventFilter instanceof AviaterRegexBinlogEventFilter) {
            binlogEventConvertToEntry.setSchemaNameAndTableNameWhiteFilter((AviaterRegexBinlogEventFilter) binlogEventFilter);
        }

        if (eventBlackFilter != null && eventBlackFilter instanceof AviaterRegexBinlogEventFilter) {
            binlogEventConvertToEntry.setSchemaNameAndTableNameFilterBlackFilter((AviaterRegexBinlogEventFilter) eventBlackFilter);
        }

        binlogEventConvertToEntry.setCharset(connectionCharset);
        binlogEventConvertToEntry.setFilterQueryDcl(filterQueryDcl);
        binlogEventConvertToEntry.setFilterQueryDml(filterQueryDml);
        binlogEventConvertToEntry.setFilterQueryDdl(filterQueryDdl);
        binlogEventConvertToEntry.setFilterRows(filterRows);
        binlogEventConvertToEntry.setFilterTableError(filterTableError);
        binlogEventConvertToEntry.setUseDruidDdlParser(useDruidDdlFilter);
        return binlogEventConvertToEntry;
    }

    public void setBinlogEventFilter(BinlogEventFilter binlogEventFilter) {
        super.setBinlogEventFilter(binlogEventFilter);

        // 触发一下filter变更
        if (binlogEventFilter != null && binlogEventFilter instanceof AviaterRegexBinlogEventFilter) {
            if (binLogEventParser instanceof BinlogEventConvertToEntry) {
                ((BinlogEventConvertToEntry) binLogEventParser).setSchemaNameAndTableNameWhiteFilter((AviaterRegexBinlogEventFilter) binlogEventFilter);
            }

            if (tableMetaDataStore != null && tableMetaDataStore instanceof DatabaseTableMetaDataStore) {
                ((DatabaseTableMetaDataStore) tableMetaDataStore).setFilter(binlogEventFilter);
            }
        }
    }

    public void setEventBlackFilter(BinlogEventFilter eventBlackFilter) {
        super.setEventBlackFilter(eventBlackFilter);

        // 触发一下filter变更
        if (eventBlackFilter != null && eventBlackFilter instanceof AviaterRegexBinlogEventFilter) {
            if (binLogEventParser instanceof BinlogEventConvertToEntry) {
                ((BinlogEventConvertToEntry) binLogEventParser).setSchemaNameAndTableNameFilterBlackFilter((AviaterRegexBinlogEventFilter) eventBlackFilter);
            }

            if (tableMetaDataStore != null && tableMetaDataStore instanceof DatabaseTableMetaDataStore) {
                ((DatabaseTableMetaDataStore) tableMetaDataStore).setBlackFilter(eventBlackFilter);
            }
        }
    }

    /**
     * 回滚到指定位点
     */
    protected boolean processTableMetaData(EntryPosition entryPosition) {
        if (tableMetaDataStore != null) {
            if (entryPosition.getExecuteTimestamp() == null || entryPosition.getExecuteTimestamp() <= 0) {
                throw new ParseException("use gtid and TableMetaData TSDB should be config executeTimestamp > 0");
            }

            return tableMetaDataStore.rollback(entryPosition);
        }

        return true;
    }

    public void start() throws ParseException {
        if (enableTableMetaDataCache) {
            if (tableMetaDataStore == null) {
                synchronized (EventParser.class) {
                    try {
                        // 设置当前正在加载的通道，加载spring查找文件时会用到该变量
                        System.setProperty("canal.task.taskIdSequense", taskId);
                        // 初始化
                        tableMetaDataStore = tableMetaDataFactory.build(taskId, tsdbSpringXml);
                    } finally {
                        System.setProperty("canal.task.taskIdSequense", "");
                    }
                }
            }
        }

        super.start();
    }

    public void stop() throws ParseException {
        if (enableTableMetaDataCache) {
            tableMetaDataFactory.destory(taskId);
            tableMetaDataStore = null;
        }

        super.stop();
    }

    protected MultistageCoordinator buildMultistageCoordinator() {
        MultistageCoordinatorImpl multiStageCoprocessorImpl = new MultistageCoordinatorImpl(parallelBufferSize,
                parallelThreadSize, (BinlogEventConvertToEntry) binLogEventParser, eventList, taskId);
        multiStageCoprocessorImpl.setEventsPublishBlockingTime(eventsPublishBlockingTime);
        return multiStageCoprocessorImpl;
    }


    public void setEnableTableMetaDataCache(boolean enableTableMetaDataCache) {
        this.enableTableMetaDataCache = enableTableMetaDataCache;
        if (this.enableTableMetaDataCache) {
            if (tableMetaDataStore == null) {
                // 初始化
                tableMetaDataStore = tableMetaDataFactory.build(taskId, tsdbSpringXml);
            }
        }
    }

    public void setTsdbSpringXml(String tsdbSpringXml) {
        this.tsdbSpringXml = tsdbSpringXml;
        if (this.enableTableMetaDataCache) {
            if (tableMetaDataStore == null) {
                // 初始化
                tableMetaDataStore = tableMetaDataFactory.build(taskId, tsdbSpringXml);
            }
        }
    }


}
